package com.chensi.flink.util;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * MessageSplitter类（将获取到的每条Kafka消息根据“，”分割取出其中的主机名和内存数信息）
 *
 * @author si.chen
 * @date 2020/8/29 13:36
 */
public class MessageSplitter implements FlatMapFunction<String, Tuple2<String, Long>> {
    private static final long serialVersionUID = -7113392535354788878L;

    @Override
    public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
        if (value != null && value.contains(",")) {
            String[] parts = value.split(",");
            out.collect(new Tuple2<String, Long>(parts[1], Long.parseLong(parts[2])));
        }
    }
}
